{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter\n",
    "    notebook, restart the kernel after installing the SDK. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install kfp --upgrade\n",
    "# to install tekton compiler uncomment the line below\n",
    "# %pip install kfp_tekton"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Import Packages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "import time\n",
    "import yaml\n",
    "\n",
    "import kfp\n",
    "import kfp.components as comp\n",
    "import kfp.dsl as dsl"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "SPARK_COMPLETED_STATE = \"COMPLETED\"\n",
    "SPARK_APPLICATION_KIND = \"sparkapplications\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "def get_spark_job_definition():\n",
    "    \"\"\"\n",
    "    Read Spark Operator job manifest file and return the corresponding dictionary and\n",
    "    add some randomness in the job name\n",
    "    :return: dictionary defining the spark job\n",
    "    \"\"\"\n",
    "    # Read manifest file\n",
    "    with open(\"spark-job.yaml\", \"r\") as stream:\n",
    "        spark_job_manifest = yaml.safe_load(stream)\n",
    "\n",
    "    # Add epoch time in the job name\n",
    "    epoch = int(time.time())\n",
    "    spark_job_manifest[\"metadata\"][\"name\"] = spark_job_manifest[\"metadata\"][\"name\"].format(epoch=epoch)\n",
    "\n",
    "    return spark_job_manifest"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def print_op(msg):\n",
    "    \"\"\"\n",
    "    Op to print a message.\n",
    "    \"\"\"\n",
    "    return dsl.ContainerOp(\n",
    "        name=\"Print message.\",\n",
    "        image=\"alpine:3.6\",\n",
    "        command=[\"echo\", msg],\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dsl.graph_component  # Graph component decorator is used to annotate recursive functions\n",
    "def graph_component_spark_app_status(input_application_name):\n",
    "    k8s_get_op = comp.load_component_from_file(\"k8s-get-component.yaml\")\n",
    "    check_spark_application_status_op = k8s_get_op(\n",
    "        name=input_application_name,\n",
    "        kind=SPARK_APPLICATION_KIND\n",
    "    )\n",
    "    # Remove cache\n",
    "    check_spark_application_status_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
    "\n",
    "    time.sleep(5)\n",
    "    with dsl.Condition(check_spark_application_status_op.outputs[\"applicationstate\"] != SPARK_COMPLETED_STATE):\n",
    "        graph_component_spark_app_status(check_spark_application_status_op.outputs[\"name\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dsl.pipeline(\n",
    "    name=\"Spark Operator job pipeline\",\n",
    "    description=\"Spark Operator job pipeline\"\n",
    ")\n",
    "def spark_job_pipeline():\n",
    "\n",
    "    # Load spark job manifest\n",
    "    spark_job_definition = get_spark_job_definition()\n",
    "\n",
    "    # Load the kubernetes apply component\n",
    "    k8s_apply_op = comp.load_component_from_file(\"k8s-apply-component.yaml\")\n",
    "\n",
    "    # Execute the apply command\n",
    "    spark_job_op = k8s_apply_op(object=json.dumps(spark_job_definition))\n",
    "\n",
    "    # Fetch spark job name\n",
    "    spark_job_name = spark_job_op.outputs[\"name\"]\n",
    "\n",
    "    # Remove cache for the apply operator\n",
    "    spark_job_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
    "\n",
    "    spark_application_status_op = graph_component_spark_app_status(spark_job_op.outputs[\"name\"])\n",
    "    spark_application_status_op.after(spark_job_op)\n",
    "\n",
    "    print_message = print_op(f\"Job {spark_job_name} is completed.\")\n",
    "    print_message.after(spark_application_status_op)\n",
    "    print_message.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Compile and run your pipeline\n",
    "\n",
    "After defining the pipeline in Python as described in the preceding section, use one of the following options to compile the pipeline and submit it to the Kubeflow Pipelines service.\n",
    "\n",
    "#### Option 1: Compile and then upload in UI\n",
    "\n",
    "1.  Run the following to compile your pipeline and save it as `spark_job_pipeline.yaml`. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For Argo (Default)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create piepline file for argo backend the default one if you use tekton use the block below\n",
    "if __name__ == \"__main__\":\n",
    "    # Compile the pipeline\n",
    "    import kfp.compiler as compiler\n",
    "    import logging\n",
    "    logging.basicConfig(level=logging.INFO)\n",
    "    pipeline_func = spark_job_pipeline\n",
    "    pipeline_filename = pipeline_func.__name__ + \".yaml\"\n",
    "    compiler.Compiler().compile(pipeline_func, pipeline_filename)\n",
    "    logging.info(f\"Generated pipeline file: {pipeline_filename}.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For Tekton"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# uncomment the block below to create pipeline file for tekton\n",
    "\n",
    "# if __name__ == '__main__':\n",
    "#     from kfp_tekton.compiler import TektonCompiler\n",
    "#     import logging\n",
    "#     logging.basicConfig(level=logging.INFO)\n",
    "#     pipeline_func = spark_job_pipeline\n",
    "#     pipeline_filename = pipeline_func.__name__ + \".yaml\"\n",
    "#     TektonCompiler().compile(pipeline_func, pipeline_filename)\n",
    "#     logging.info(f\"Generated pipeline file: {pipeline_filename}.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2.  Upload and run your `spark_job_pipeline.yaml` using the Kubeflow Pipelines user interface.\n",
    "See the guide to [getting started with the UI][quickstart].\n",
    "\n",
    "[quickstart]: https://www.kubeflow.org/docs/components/pipelines/overview/quickstart"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Option 2: run the pipeline using Kubeflow Pipelines SDK client\n",
    "\n",
    "1.  Create an instance of the [`kfp.Client` class][kfp-client] following steps in [connecting to Kubeflow Pipelines using the SDK client][connect-api].\n",
    "\n",
    "[kfp-client]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html#kfp.Client\n",
    "[connect-api]: https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = kfp.Client() # change arguments accordingly"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.create_run_from_pipeline_func(\n",
    "   spark_job_pipeline)"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
